/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.fn.data;



import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentMap;

import java.util.concurrent.ExecutionException;

import java.util.function.Consumer;

import javax.annotation.Nullable;

import com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi;

import com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi.Elements;

import com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi.Elements.Data;

import com.bff.gaia.unified.model.pipeline.v1.Endpoints;

import com.bff.gaia.unified.sdk.fn.stream.OutboundObserverFactory;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.Status;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



/**

 * A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}.

 *

 * <p>Multiplexes data for inbound consumers based upon their individual {@link

 * com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi.Target}s.

 *

 * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those streams.

 * For inbound streams, this is as thread safe as the inbound observers. For outbound streams, this

 * is as thread safe as the underlying stream observer.

 *

 * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output

 * location with a specific outbound observer.

 */

public class UnifiedFnDataGrpcMultiplexer implements AutoCloseable {

  private static final Logger LOG = LoggerFactory.getLogger(UnifiedFnDataGrpcMultiplexer.class);

  @Nullable

  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;

  private final StreamObserver<UnifiedFnApi.Elements> inboundObserver;

  private final StreamObserver<UnifiedFnApi.Elements> outboundObserver;

  private final ConcurrentMap<LogicalEndpoint, CompletableFuture<Consumer<UnifiedFnApi.Elements.Data>>>

      consumers;



  public UnifiedFnDataGrpcMultiplexer(

      @Nullable Endpoints.ApiServiceDescriptor apiServiceDescriptor,

      OutboundObserverFactory outboundObserverFactory,

      OutboundObserverFactory.BasicFactory<UnifiedFnApi.Elements, UnifiedFnApi.Elements> baseOutboundObserverFactory) {

    this.apiServiceDescriptor = apiServiceDescriptor;

    this.consumers = new ConcurrentHashMap<>();

    this.inboundObserver = new InboundObserver();

    this.outboundObserver =

        outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver);

  }



  @Override

  public String toString() {

    return MoreObjects.toStringHelper(this)

        .omitNullValues()

        .add("apiServiceDescriptor", apiServiceDescriptor)

        .add("consumers", consumers)

        .toString();

  }



  public StreamObserver<UnifiedFnApi.Elements> getInboundObserver() {

    return inboundObserver;

  }



  public StreamObserver<Elements> getOutboundObserver() {

    return outboundObserver;

  }



  private CompletableFuture<Consumer<Data>> receiverFuture(LogicalEndpoint endpoint) {

    return consumers.computeIfAbsent(

        endpoint, (LogicalEndpoint unused) -> new CompletableFuture<>());

  }



  public void registerConsumer(

      LogicalEndpoint inputLocation, Consumer<Data> dataBytesReceiver) {

    receiverFuture(inputLocation).complete(dataBytesReceiver);

  }



  @VisibleForTesting

  boolean hasConsumer(LogicalEndpoint outputLocation) {

    return consumers.containsKey(outputLocation);

  }



  @Override

  public void close() {

    for (CompletableFuture<Consumer<Data>> receiver :

        ImmutableList.copyOf(consumers.values())) {

      // Cancel any observer waiting for the client to complete. If the receiver has already been

      // completed or cancelled, this call will be ignored.

      receiver.cancel(true);

    }

    // Cancel any outbound calls and complete any inbound calls, as this multiplexer is hanging up

    outboundObserver.onError(

        Status.CANCELLED.withDescription("Multiplexer hanging up").asException());

    inboundObserver.onCompleted();

  }



  /**

   * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass the

   * elements to.

   *

   * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the sending

   * harness to initiate transmitting data without needing for the receiving harness to signal that

   * it is ready to consume that data.

   */

  private final class InboundObserver implements StreamObserver<Elements> {

    @Override

    public void onNext(UnifiedFnApi.Elements value) {

      for (UnifiedFnApi.Elements.Data data : value.getDataList()) {

        try {

          LogicalEndpoint key =

              LogicalEndpoint.of(data.getInstructionReference(), data.getTarget());

          CompletableFuture<Consumer<Data>> consumer = receiverFuture(key);

          if (!consumer.isDone()) {

            LOG.debug(

                "Received data for key {} without consumer ready. "

                    + "Waiting for consumer to be registered.",

                key);

          }

          consumer.get().accept(data);

          if (data.getData().isEmpty()) {

            consumers.remove(key);

          }

          /*

           * TODO: On failure we should fail any bundles that were impacted eagerly

           * instead of relying on the Runner harness to do all the failure handling.

           */

        } catch (ExecutionException | InterruptedException e) {

          LOG.error(

              "Client interrupted during handling of data for instruction {} and target {}",

              data.getInstructionReference(),

              data.getTarget(),

              e);

          outboundObserver.onError(e);

        } catch (RuntimeException e) {

          LOG.error(

              "Client failed to handle data for instruction {} and target {}",

              data.getInstructionReference(),

              data.getTarget(),

              e);

          outboundObserver.onError(e);

        }

      }

    }



    @Override

    public void onError(Throwable t) {

      LOG.error(

          "Failed to handle for {}",

          apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor,

          t);

    }



    @Override

    public void onCompleted() {

      LOG.warn(

          "Hanged up for {}.",

          apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor);

    }

  }

}